added some development tools
[windows-sources.git] / developer / Samples / NET Standard / ParallelExtensionsExtras_Standard / TaskSchedulers / QueuedTaskScheduler.cs
blobcc71ca1d64cb4307d9a14a6a5ca75646b8b22cb2
1 //--------------------------------------------------------------------------
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // File: QueuedTaskScheduler.cs
6 //
7 //--------------------------------------------------------------------------
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.Diagnostics;
12 using System.Linq;
14 namespace System.Threading.Tasks.Schedulers
16 /// <summary>
17 /// Provides a TaskScheduler that provides control over priorities, fairness, and the underlying threads utilized.
18 /// </summary>
19 [DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView))]
20 [DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")]
21 public sealed class QueuedTaskScheduler : TaskScheduler, IDisposable
23 /// <summary>Debug view for the QueuedTaskScheduler.</summary>
24 private class QueuedTaskSchedulerDebugView
26 /// <summary>The scheduler.</summary>
27 private QueuedTaskScheduler _scheduler;
29 /// <summary>Initializes the debug view.</summary>
30 /// <param name="scheduler">The scheduler.</param>
31 public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler)
33 if (scheduler == null) throw new ArgumentNullException("scheduler");
34 _scheduler = scheduler;
37 /// <summary>Gets all of the Tasks queued to the scheduler directly.</summary>
38 public IEnumerable<Task> ScheduledTasks
40 get
42 var tasks = (_scheduler._targetScheduler != null) ?
43 (IEnumerable<Task>)_scheduler._nonthreadsafeTaskQueue :
44 (IEnumerable<Task>)_scheduler._blockingTaskQueue;
45 return tasks.Where(t => t != null).ToList();
49 /// <summary>Gets the prioritized and fair queues.</summary>
50 public IEnumerable<TaskScheduler> Queues
52 get
54 List<TaskScheduler> queues = new List<TaskScheduler>();
55 foreach (var group in _scheduler._queueGroups) queues.AddRange(group.Value);
56 return queues;
61 /// <summary>
62 /// A sorted list of round-robin queue lists. Tasks with the smallest priority value
63 /// are preferred. Priority groups are round-robin'd through in order of priority.
64 /// </summary>
65 private readonly SortedList<int, QueueGroup> _queueGroups = new SortedList<int, QueueGroup>();
66 /// <summary>Cancellation token used for disposal.</summary>
67 private readonly CancellationTokenSource _disposeCancellation = new CancellationTokenSource();
68 /// <summary>
69 /// The maximum allowed concurrency level of this scheduler. If custom threads are
70 /// used, this represents the number of created threads.
71 /// </summary>
72 private readonly int _concurrencyLevel;
73 /// <summary>Whether we're processing tasks on the current thread.</summary>
74 private static ThreadLocal<bool> _taskProcessingThread = new ThreadLocal<bool>();
76 // ***
77 // *** For when using a target scheduler
78 // ***
80 /// <summary>The scheduler onto which actual work is scheduled.</summary>
81 private readonly TaskScheduler _targetScheduler;
82 /// <summary>The queue of tasks to process when using an underlying target scheduler.</summary>
83 private readonly Queue<Task> _nonthreadsafeTaskQueue;
84 /// <summary>The number of Tasks that have been queued or that are running whiel using an underlying scheduler.</summary>
85 private int _delegatesQueuedOrRunning = 0;
87 // ***
88 // *** For when using our own threads
89 // ***
91 /// <summary>The threads used by the scheduler to process work.</summary>
92 private readonly Thread[] _threads;
93 /// <summary>The collection of tasks to be executed on our custom threads.</summary>
94 private readonly BlockingCollection<Task> _blockingTaskQueue;
96 // ***
98 /// <summary>Initializes the scheduler.</summary>
99 public QueuedTaskScheduler() : this(TaskScheduler.Default, 0) { }
101 /// <summary>Initializes the scheduler.</summary>
102 /// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param>
103 public QueuedTaskScheduler(TaskScheduler targetScheduler) : this(targetScheduler, 0) { }
105 /// <summary>Initializes the scheduler.</summary>
106 /// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param>
107 /// <param name="maxConcurrencyLevel">The maximum degree of concurrency allowed for this scheduler's work.</param>
108 public QueuedTaskScheduler(
109 TaskScheduler targetScheduler,
110 int maxConcurrencyLevel)
112 // Validate arguments
113 if (targetScheduler == null) throw new ArgumentNullException("underlyingScheduler");
114 if (maxConcurrencyLevel < 0) throw new ArgumentOutOfRangeException("concurrencyLevel");
116 // Initialize only those fields relevant to use an underlying scheduler. We don't
117 // initialize the fields relevant to using our own custom threads.
118 _targetScheduler = targetScheduler;
119 _nonthreadsafeTaskQueue = new Queue<Task>();
121 // If 0, use the number of logical processors. But make sure whatever value we pick
122 // is not greater than the degree of parallelism allowed by the underlying scheduler.
123 _concurrencyLevel = maxConcurrencyLevel != 0 ? maxConcurrencyLevel : Environment.ProcessorCount;
124 if (targetScheduler.MaximumConcurrencyLevel > 0 &&
125 targetScheduler.MaximumConcurrencyLevel < _concurrencyLevel)
127 _concurrencyLevel = targetScheduler.MaximumConcurrencyLevel;
131 /// <summary>Initializes the scheduler.</summary>
132 /// <param name="threadCount">The number of threads to create and use for processing work items.</param>
133 public QueuedTaskScheduler(int threadCount) : this(threadCount, string.Empty, false, ThreadPriority.Normal, ApartmentState.MTA, 0, null, null) { }
135 /// <summary>Initializes the scheduler.</summary>
136 /// <param name="threadCount">The number of threads to create and use for processing work items.</param>
137 /// <param name="threadName">The name to use for each of the created threads.</param>
138 /// <param name="useForegroundThreads">A Boolean value that indicates whether to use foreground threads instead of background.</param>
139 /// <param name="threadPriority">The priority to assign to each thread.</param>
140 /// <param name="threadApartmentState">The apartment state to use for each thread.</param>
141 /// <param name="threadMaxStackSize">The stack size to use for each thread.</param>
142 /// <param name="threadInit">An initialization routine to run on each thread.</param>
143 /// <param name="threadFinally">A finalization routine to run on each thread.</param>
144 public QueuedTaskScheduler(
145 int threadCount,
146 string threadName = "",
147 bool useForegroundThreads = false,
148 ThreadPriority threadPriority = ThreadPriority.Normal,
149 ApartmentState threadApartmentState = ApartmentState.MTA,
150 int threadMaxStackSize = 0,
151 Action threadInit = null,
152 Action threadFinally = null)
154 // Validates arguments (some validation is left up to the Thread type itself).
155 // If the thread count is 0, default to the number of logical processors.
156 if (threadCount < 0) throw new ArgumentOutOfRangeException("concurrencyLevel");
157 else if (threadCount == 0) _concurrencyLevel = Environment.ProcessorCount;
158 else _concurrencyLevel = threadCount;
160 // Initialize the queue used for storing tasks
161 _blockingTaskQueue = new BlockingCollection<Task>();
163 // Create all of the threads
164 _threads = new Thread[threadCount];
165 for (int i = 0; i < threadCount; i++)
167 _threads[i] = new Thread(() => ThreadBasedDispatchLoop(threadInit, threadFinally), threadMaxStackSize)
169 Priority = threadPriority,
170 IsBackground = !useForegroundThreads,
172 if (threadName != null) _threads[i].Name = threadName + " (" + i + ")";
173 _threads[i].SetApartmentState(threadApartmentState);
176 // Start all of the threads
177 foreach (var thread in _threads) thread.Start();
180 /// <summary>The dispatch loop run by all threads in this scheduler.</summary>
181 /// <param name="threadInit">An initialization routine to run when the thread begins.</param>
182 /// <param name="threadFinally">A finalization routine to run before the thread ends.</param>
183 private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally)
185 _taskProcessingThread.Value = true;
186 if (threadInit != null) threadInit();
189 // If the scheduler is disposed, the cancellation token will be set and
190 // we'll receive an OperationCanceledException. That OCE should not crash the process.
193 // If a thread abort occurs, we'll try to reset it and continue running.
194 while (true)
198 // For each task queued to the scheduler, try to execute it.
199 foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
201 // If the task is not null, that means it was queued to this scheduler directly.
202 // Run it.
203 if (task != null)
205 TryExecuteTask(task);
207 // If the task is null, that means it's just a placeholder for a task
208 // queued to one of the subschedulers. Find the next task based on
209 // priority and fairness and run it.
210 else
212 // Find the next task based on our ordering rules...
213 Task targetTask;
214 QueuedTaskSchedulerQueue queueForTargetTask;
215 lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask);
217 // ... and if we found one, run it
218 if (targetTask != null) queueForTargetTask.ExecuteTask(targetTask);
222 catch (ThreadAbortException)
224 // If we received a thread abort, and that thread abort was due to shutting down
225 // or unloading, let it pass through. Otherwise, reset the abort so we can
226 // continue processing work items.
227 if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
229 Thread.ResetAbort();
234 catch (OperationCanceledException) { }
236 finally
238 // Run a cleanup routine if there was one
239 if (threadFinally != null) threadFinally();
240 _taskProcessingThread.Value = false;
244 /// <summary>Gets the number of queues currently activated.</summary>
245 private int DebugQueueCount
249 int count = 0;
250 foreach (var group in _queueGroups) count += group.Value.Count;
251 return count;
255 /// <summary>Gets the number of tasks currently scheduled.</summary>
256 private int DebugTaskCount
260 return (_targetScheduler != null ?
261 (IEnumerable<Task>)_nonthreadsafeTaskQueue : (IEnumerable<Task>)_blockingTaskQueue)
262 .Where(t => t != null).Count();
266 /// <summary>Find the next task that should be executed, based on priorities and fairness and the like.</summary>
267 /// <param name="targetTask">The found task, or null if none was found.</param>
268 /// <param name="queueForTargetTask">
269 /// The scheduler associated with the found task. Due to security checks inside of TPL,
270 /// this scheduler needs to be used to execute that task.
271 /// </param>
272 private void FindNextTask_NeedsLock(out Task targetTask, out QueuedTaskSchedulerQueue queueForTargetTask)
274 targetTask = null;
275 queueForTargetTask = null;
277 // Look through each of our queue groups in sorted order.
278 // This ordering is based on the priority of the queues.
279 foreach (var queueGroup in _queueGroups)
281 var queues = queueGroup.Value;
283 // Within each group, iterate through the queues in a round-robin
284 // fashion. Every time we iterate again and successfully find a task,
285 // we'll start in the next location in the group.
286 foreach (int i in queues.CreateSearchOrder())
288 queueForTargetTask = queues[i];
289 var items = queueForTargetTask._workItems;
290 if (items.Count > 0)
292 targetTask = items.Dequeue();
293 if (queueForTargetTask._disposed && items.Count == 0)
295 RemoveQueue_NeedsLock(queueForTargetTask);
297 queues.NextQueueIndex = (queues.NextQueueIndex + 1) % queueGroup.Value.Count;
298 return;
304 /// <summary>Queues a task to the scheduler.</summary>
305 /// <param name="task">The task to be queued.</param>
306 protected override void QueueTask(Task task)
308 // If we've been disposed, no one should be queueing
309 if (_disposeCancellation.IsCancellationRequested) throw new ObjectDisposedException(GetType().Name);
311 // If the target scheduler is null (meaning we're using our own threads),
312 // add the task to the blocking queue
313 if (_targetScheduler == null)
315 _blockingTaskQueue.Add(task);
317 // Otherwise, add the task to the non-blocking queue,
318 // and if there isn't already an executing processing task,
319 // start one up
320 else
322 // Queue the task and check whether we should launch a processing
323 // task (noting it if we do, so that other threads don't result
324 // in queueing up too many).
325 bool launchTask = false;
326 lock (_nonthreadsafeTaskQueue)
328 _nonthreadsafeTaskQueue.Enqueue(task);
329 if (_delegatesQueuedOrRunning < _concurrencyLevel)
331 ++_delegatesQueuedOrRunning;
332 launchTask = true;
336 // If necessary, start processing asynchronously
337 if (launchTask)
339 Task.Factory.StartNew(ProcessPrioritizedAndBatchedTasks,
340 CancellationToken.None, TaskCreationOptions.None, _targetScheduler);
345 /// <summary>
346 /// Process tasks one at a time in the best order.
347 /// This should be run in a Task generated by QueueTask.
348 /// It's been separated out into its own method to show up better in Parallel Tasks.
349 /// </summary>
350 private void ProcessPrioritizedAndBatchedTasks()
352 bool continueProcessing = true;
353 while (!_disposeCancellation.IsCancellationRequested && continueProcessing)
357 // Note that we're processing tasks on this thread
358 _taskProcessingThread.Value = true;
360 // Until there are no more tasks to process
361 while (!_disposeCancellation.IsCancellationRequested)
363 // Try to get the next task. If there aren't any more, we're done.
364 Task targetTask;
365 lock (_nonthreadsafeTaskQueue)
367 if (_nonthreadsafeTaskQueue.Count == 0) break;
368 targetTask = _nonthreadsafeTaskQueue.Dequeue();
371 // If the task is null, it's a placeholder for a task in the round-robin queues.
372 // Find the next one that should be processed.
373 QueuedTaskSchedulerQueue queueForTargetTask = null;
374 if (targetTask == null)
376 lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask);
379 // Now if we finally have a task, run it. If the task
380 // was associated with one of the round-robin schedulers, we need to use it
381 // as a thunk to execute its task.
382 if (targetTask != null)
384 if (queueForTargetTask != null) queueForTargetTask.ExecuteTask(targetTask);
385 else TryExecuteTask(targetTask);
389 finally
391 // Now that we think we're done, verify that there really is
392 // no more work to do. If there's not, highlight
393 // that we're now less parallel than we were a moment ago.
394 lock (_nonthreadsafeTaskQueue)
396 if (_nonthreadsafeTaskQueue.Count == 0)
398 _delegatesQueuedOrRunning--;
399 continueProcessing = false;
400 _taskProcessingThread.Value = false;
407 /// <summary>Notifies the pool that there's a new item to be executed in one of the round-robin queues.</summary>
408 private void NotifyNewWorkItem() { QueueTask(null); }
410 /// <summary>Tries to execute a task synchronously on the current thread.</summary>
411 /// <param name="task">The task to execute.</param>
412 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
413 /// <returns>true if the task was executed; otherwise, false.</returns>
414 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
416 // If we're already running tasks on this threads, enable inlining
417 return _taskProcessingThread.Value && TryExecuteTask(task);
420 /// <summary>Gets the tasks scheduled to this scheduler.</summary>
421 /// <returns>An enumerable of all tasks queued to this scheduler.</returns>
422 /// <remarks>This does not include the tasks on sub-schedulers. Those will be retrieved by the debugger separately.</remarks>
423 protected override IEnumerable<Task> GetScheduledTasks()
425 // If we're running on our own threads, get the tasks from the blocking queue...
426 if (_targetScheduler == null)
428 // Get all of the tasks, filtering out nulls, which are just placeholders
429 // for tasks in other sub-schedulers
430 return _blockingTaskQueue.Where(t => t != null).ToList();
432 // otherwise get them from the non-blocking queue...
433 else
435 return _nonthreadsafeTaskQueue.Where(t => t != null).ToList();
439 /// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
440 public override int MaximumConcurrencyLevel { get { return _concurrencyLevel; } }
442 /// <summary>Initiates shutdown of the scheduler.</summary>
443 public void Dispose()
445 _disposeCancellation.Cancel();
448 /// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
449 /// <returns>The newly created and activated queue at priority 0.</returns>
450 public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0); }
452 /// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
453 /// <param name="priority">The priority level for the new queue.</param>
454 /// <returns>The newly created and activated queue at the specified priority.</returns>
455 public TaskScheduler ActivateNewQueue(int priority)
457 // Create the queue
458 var createdQueue = new QueuedTaskSchedulerQueue(priority, this);
460 // Add the queue to the appropriate queue group based on priority
461 lock (_queueGroups)
463 QueueGroup list;
464 if (!_queueGroups.TryGetValue(priority, out list))
466 list = new QueueGroup();
467 _queueGroups.Add(priority, list);
469 list.Add(createdQueue);
472 // Hand the new queue back
473 return createdQueue;
476 /// <summary>Removes a scheduler from the group.</summary>
477 /// <param name="queue">The scheduler to be removed.</param>
478 private void RemoveQueue_NeedsLock(QueuedTaskSchedulerQueue queue)
480 // Find the group that contains the queue and the queue's index within the group
481 var queueGroup = _queueGroups[queue._priority];
482 int index = queueGroup.IndexOf(queue);
484 // We're about to remove the queue, so adjust the index of the next
485 // round-robin starting location if it'll be affected by the removal
486 if (queueGroup.NextQueueIndex >= index) queueGroup.NextQueueIndex--;
488 // Remove it
489 queueGroup.RemoveAt(index);
492 /// <summary>A group of queues a the same priority level.</summary>
493 private class QueueGroup : List<QueuedTaskSchedulerQueue>
495 /// <summary>The starting index for the next round-robin traversal.</summary>
496 public int NextQueueIndex = 0;
498 /// <summary>Creates a search order through this group.</summary>
499 /// <returns>An enumerable of indices for this group.</returns>
500 public IEnumerable<int> CreateSearchOrder()
502 for (int i = NextQueueIndex; i < Count; i++) yield return i;
503 for (int i = 0; i < NextQueueIndex; i++) yield return i;
507 /// <summary>Provides a scheduling queue associatd with a QueuedTaskScheduler.</summary>
508 [DebuggerDisplay("QueuePriority = {_priority}, WaitingTasks = {WaitingTasks}")]
509 [DebuggerTypeProxy(typeof(QueuedTaskSchedulerQueueDebugView))]
510 private sealed class QueuedTaskSchedulerQueue : TaskScheduler, IDisposable
512 /// <summary>A debug view for the queue.</summary>
513 private sealed class QueuedTaskSchedulerQueueDebugView
515 /// <summary>The queue.</summary>
516 private readonly QueuedTaskSchedulerQueue _queue;
518 /// <summary>Initializes the debug view.</summary>
519 /// <param name="queue">The queue to be debugged.</param>
520 public QueuedTaskSchedulerQueueDebugView(QueuedTaskSchedulerQueue queue)
522 if (queue == null) throw new ArgumentNullException("queue");
523 _queue = queue;
526 /// <summary>Gets the priority of this queue in its associated scheduler.</summary>
527 public int Priority { get { return _queue._priority; } }
528 /// <summary>Gets the ID of this scheduler.</summary>
529 public int Id { get { return _queue.Id; } }
530 /// <summary>Gets all of the tasks scheduled to this queue.</summary>
531 public IEnumerable<Task> ScheduledTasks { get { return _queue.GetScheduledTasks(); } }
532 /// <summary>Gets the QueuedTaskScheduler with which this queue is associated.</summary>
533 public QueuedTaskScheduler AssociatedScheduler { get { return _queue._pool; } }
536 /// <summary>The scheduler with which this pool is associated.</summary>
537 private readonly QueuedTaskScheduler _pool;
538 /// <summary>The work items stored in this queue.</summary>
539 internal readonly Queue<Task> _workItems;
540 /// <summary>Whether this queue has been disposed.</summary>
541 internal bool _disposed;
542 /// <summary>Gets the priority for this queue.</summary>
543 internal int _priority;
545 /// <summary>Initializes the queue.</summary>
546 /// <param name="priority">The priority associated with this queue.</param>
547 /// <param name="pool">The scheduler with which this queue is associated.</param>
548 internal QueuedTaskSchedulerQueue(int priority, QueuedTaskScheduler pool)
550 _priority = priority;
551 _pool = pool;
552 _workItems = new Queue<Task>();
555 /// <summary>Gets the number of tasks waiting in this scheduler.</summary>
556 internal int WaitingTasks { get { return _workItems.Count; } }
558 /// <summary>Gets the tasks scheduled to this scheduler.</summary>
559 /// <returns>An enumerable of all tasks queued to this scheduler.</returns>
560 protected override IEnumerable<Task> GetScheduledTasks() { return _workItems.ToList(); }
562 /// <summary>Queues a task to the scheduler.</summary>
563 /// <param name="task">The task to be queued.</param>
564 protected override void QueueTask(Task task)
566 if (_disposed) throw new ObjectDisposedException(GetType().Name);
568 // Queue up the task locally to this queue, and then notify
569 // the parent scheduler that there's work available
570 lock (_pool._queueGroups) _workItems.Enqueue(task);
571 _pool.NotifyNewWorkItem();
574 /// <summary>Tries to execute a task synchronously on the current thread.</summary>
575 /// <param name="task">The task to execute.</param>
576 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
577 /// <returns>true if the task was executed; otherwise, false.</returns>
578 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
580 // If we're using our own threads and if this is being called from one of them,
581 // or if we're currently processing another task on this thread, try running it inline.
582 return _taskProcessingThread.Value && TryExecuteTask(task);
585 /// <summary>Runs the specified ask.</summary>
586 /// <param name="task">The task to execute.</param>
587 internal void ExecuteTask(Task task) { TryExecuteTask(task); }
589 /// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
590 public override int MaximumConcurrencyLevel { get { return _pool.MaximumConcurrencyLevel; } }
592 /// <summary>Signals that the queue should be removed from the scheduler as soon as the queue is empty.</summary>
593 public void Dispose()
595 if (!_disposed)
597 lock (_pool._queueGroups)
599 // We only remove the queue if it's empty. If it's not empty,
600 // we still mark it as disposed, and the associated QueuedTaskScheduler
601 // will remove the queue when its count hits 0 and its _disposed is true.
602 if (_workItems.Count == 0)
604 _pool.RemoveQueue_NeedsLock(this);
607 _disposed = true;